package com.atguigu.realtime.canalclients;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.atguigu.common.constants.TopicConstant;
import com.atguigu.common.utils.KafkaClientUtil;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;

import java.net.InetSocketAddress;
import java.util.List;

/**
 * Created by Smexy on 2022/6/27
 *
 * ①先创建一个客户端对象CanalConnector
 *
 * ②使用客户端对象连接 Canal server端
 *
 * ③订阅表
 *
 * ④解析订阅到的数据
 *
 * ⑤将数据写入kafka
 */
public class OrderClient {

    public static void main(String[] args) throws InterruptedException, InvalidProtocolBufferException {

        /*
            ①先创建一个客户端对象CanalConnector

            SocketAddress address: 要连接的canal server的地址。
                                        参考 canal.properties 中canal.ip = hadoop103
                                            canal.port = 11111
            String destination:  配置要连接的Mysql的实例的配置文件 instantce.properties所在的父目录
                                   参考 canal.properties 中:
                                canal.destinations = example
             String username：  1.1.2 没有配置
             String password:   1.1.2 没有配置
         */
        CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("hadoop103", 11111), "example", null, null);


        //②使用客户端对象连接 Canal server端
        canalConnector.connect();

        //③订阅表  格式： 库名.表名
        canalConnector.subscribe("220212.order_info");

        // 24h不间断，到canal server端拉取数据
        while (true){

            //当前拉取到的一批数据
            Message message = canalConnector.get(100);

            //判断当前是否拉取到了数据，拉取到了，就打印，如果没有，歇会继续取拉
            if (message.getId() == -1){

                System.out.println("当前没有最新数据，歇5s....");

                Thread.sleep(5000);

                //跳过本次循环，开始下次循环
               continue;

            }


            //拉取到数据，就打印
            //System.out.println(message);
            List<CanalEntry.Entry> entries = message.getEntries();

            for (CanalEntry.Entry entry : entries) {

                //获取表名
                //String tableName = entry.getHeader().getTableName();

                if (entry.getEntryType().equals(CanalEntry.EntryType.ROWDATA)){

                    //获取数据
                    ByteString storeValue = entry.getStoreValue();

                    //解析数据
                    parseData(storeValue);
                }


            }

        }

        //④解析订阅到的数据


    }

    private static void parseData(ByteString storeValue) throws InvalidProtocolBufferException {

        /*
                反序列化   rowChange: 1条ROWDATA类型的sql语句，反序列化后的内容

                insert into table xxx values(),values(),values()
         */
        CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);

        if (rowChange.getEventType().equals(CanalEntry.EventType.INSERT)){

            //获取这条sql，插入的所有行
            List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();

            //每个rowData是其中1行
            // maxwell 采集 update   :   data:{更新后的列}， old:{更新的列，在更新前的值}
            for (CanalEntry.RowData rowData : rowDatasList) {

                //遍历一行
                JSONObject jsonObject = new JSONObject();

                //这一行变化后的列
                List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();

                for (CanalEntry.Column column : afterColumnsList) {
                    jsonObject.put(column.getName(),column.getValue());
                }

                //这一行变化前的列
                //rowData.getBeforeColumnsList();
                //System.out.println(jsonObject);

                //写入Kafka
                KafkaClientUtil.sendDataToKafka(TopicConstant.ORDER_INFO,jsonObject.toJSONString());

            }

        }

    }
}
